// Copyright 2020 Zhizhesihai (Beijing) Technology Limited.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package zetta

import (
	"container/list"
	"fmt"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/zhihu/zetta-client-go/utils/retry"
	tspb "github.com/zhihu/zetta-proto/pkg/tablestore"
	"golang.org/x/net/context"
	"google.golang.org/grpc/codes"
)

// sessionHandle is an interface for transactions to access Cloud Spanner sessions safely. It is generated by sessionPool.take().
type sessionHandle struct {
	// mu guarantees that inner session object is returned / destroyed only once.
	mu sync.Mutex
	// session is a pointer to a session object. Transactions never need to access it directly.
	session *session
}

// recycle gives the inner session object back to its home session pool. It is safe to call recycle multiple times but only the first one would take effect.
func (sh *sessionHandle) recycle() {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.session == nil {
		// sessionHandle has already been recycled.
		return
	}
	sh.session.recycle()
	sh.session = nil
}

// getID gets the Cloud Spanner session ID from the internal session object. getID returns empty string if the sessionHandle is nil or the inner session
// object has been released by recycle / destroy.
func (sh *sessionHandle) getID() string {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.session == nil {
		// sessionHandle has already been recycled/destroyed.
		return ""
	}
	return sh.session.getID()
}

// getClient gets the Cloud Spanner RPC client associated with the session ID in sessionHandle.
func (sh *sessionHandle) getClient() tspb.TablestoreClient {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.session == nil {
		return nil
	}
	return sh.session.client
}

// getTransactionID returns the transaction id in the session if available.
func (sh *sessionHandle) getTransactionID() transactionID {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.session == nil {
		return nil
	}
	return sh.session.tx
}

// destroy destroys the inner session object. It is safe to call destroy multiple times and only the first call would attempt to
// destroy the inner session object.
func (sh *sessionHandle) destroy() {
	sh.mu.Lock()
	s := sh.session
	sh.session = nil
	sh.mu.Unlock()
	if s == nil {
		// sessionHandle has already been destroyed.
		return
	}
	s.destroy(false)
}

//
// session wraps a Zetta Server session ID through which transactions are created and executed.
//
type session struct {
	// one session lead to one database
	db string
	// pb client is the RPC channel to Server
	client tspb.TablestoreClient
	// id is the unique id of the session in Server, allocate in creation
	id string
	// createTime is the timestamp of the session's creation
	createTime time.Time

	mu    sync.Mutex
	valid bool

	// tx contains the transaction id if the session has been prepared for write
	tx transactionID

	// pool fileds
	// pool is the session's home session pool where it was created. It is set only once during session's creation.
	pool *sessionPool
	// hcIndex is the index of the session inside the global healthcheck queue. If hcIndex < 0, session has been unregistered from the queue.
	hcIndex int
	// idleList is the linkedlist node which links the session to its home session pool's idle list. If idleList == nil, the
	// session is not in idle list.
	idleList *list.Element
	// nextCheck is the timestamp of next scheduled healthcheck of the session. It is maintained by the global health checker.
	nextCheck time.Time
	// checkingHelath is true if currently this session is being processed by health checker. Must be modified under health checker lock.
	checkingHealth bool
}

func (s *session) isValid() bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.valid
}

// isWritePrepared returns true if the session is prepared for write.
func (s *session) isWritePrepared() bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.tx != nil
}

func (s *session) String() string {
	s.mu.Lock()
	defer s.mu.Unlock()
	return fmt.Sprintf("[session]: id=%v, valid=%v, create=%v", s.id, s.valid, s.createTime)
}

// ping verifies if the session is still alive in Server
func (s *session) ping() error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	return retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
		_, err := s.client.GetSession(ctx, &tspb.GetSessionRequest{Name: s.getID()})
		return err
	})
}

// invalidate marks a session as invalid and returns the old validity.
func (s *session) invalidate() bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	ov := s.valid
	s.valid = false
	return ov
}

// setTransactionID sets the transaction id in the session
func (s *session) setTransactionID(tx transactionID) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.tx = tx
}

// getID returns the session ID which uniquely identifies the session in Server
func (s *session) getID() string {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.id
}

func (s *session) getClient() tspb.TablestoreClient {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.client
}

func (s *session) getTransactionID() transactionID {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.tx
}

// destroy removes the session from Server
func (s *session) destroy(isExpire bool) bool {
	// Remove s from session pool.
	if !s.pool.remove(s, isExpire) {
		return false
	}
	// Unregister s from healthcheck queue.
	s.pool.hc.unregister(s)
	// Remove s from Cloud Spanner service.
	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
	defer cancel()
	s.delete(ctx)
	return true
}

func (s *session) delete(ctx context.Context) {
	// Ignore the error because even if we fail to explicitly destroy the
	// session, it will be eventually garbage collected by Cloud Spanner.
	if err := retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
		_, e := s.client.DeleteSession(ctx, &tspb.DeleteSessionRequest{Name: s.getID()})
		return e
	}); err != nil {
		if err != nil {
			log.Printf("Failed to delete session %v. Error: %v", s.getID(), err)
		}
	}
}

// prepareForWrite prepares the session for write if it is not already in that state.
func (s *session) prepareForWrite(ctx context.Context) error {
	if s.isWritePrepared() {
		return nil
	}
	tx, err := beginTransaction(ctx, s.getID(), s.client)
	if err != nil {
		return err
	}
	s.setTransactionID(tx)
	return nil
}

func (s *session) recycle() {
	s.setTransactionID(nil)
	if !s.pool.recycle(s) {
		// s is rejected by its home session pool because it expired and the
		// session pool currently has enough open sessions.
		s.destroy(false)
	}
}

//
// functions for session pool
//

// refreshIdle refreshes the session's session ID if it is in its home session pool's idle list
// and returns true if successful.
func (s *session) refreshIdle() bool {
	s.mu.Lock()
	validAndIdle := s.valid && s.idleList != nil
	s.mu.Unlock()
	if !validAndIdle {
		// Optimization: return early if s is not valid or if s is not in idle list.
		return false
	}
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	var sid string
	err := retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
		session, e := s.client.CreateSession(ctx, &tspb.CreateSessionRequest{Database: s.pool.db})
		if e != nil {
			return e
		}
		sid = session.Name
		return nil
	})
	if err != nil {
		return false
	}
	s.pool.mu.Lock()
	s.mu.Lock()
	var recycle bool
	if s.valid && s.idleList != nil {
		// session is in idle list, refresh its session id.
		sid, s.id = s.id, sid
		if s.tx != nil {
			s.tx = nil
			s.pool.idleWriteList.Remove(s.idleList)
			// We need to put this session back into the pool.
			recycle = true
		}
	}
	s.mu.Unlock()
	s.pool.mu.Unlock()
	if recycle {
		s.pool.recycle(s)
	}
	// If we fail to explicitly destroy the session, it will be eventually garbage collected by
	// Cloud Spanner.
	if err = retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
		_, e := s.client.DeleteSession(ctx, &tspb.DeleteSessionRequest{Name: sid})
		return e
	}); err != nil {
		return false
	}
	return true
}

// setHcIndex atomically sets the session's index in the healthcheck queue and returns the old index.
func (s *session) setHcIndex(i int) int {
	s.mu.Lock()
	defer s.mu.Unlock()
	oi := s.hcIndex
	s.hcIndex = i
	return oi
}

// setIdleList atomically sets the session's idle list link and returns the old link.
func (s *session) setIdleList(le *list.Element) *list.Element {
	s.mu.Lock()
	defer s.mu.Unlock()
	old := s.idleList
	s.idleList = le
	return old
}

// setNextCheck sets the timestamp for next healthcheck on the session.
func (s *session) setNextCheck(t time.Time) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.nextCheck = t
}

// getHcIndex returns the session's index into the global healthcheck priority queue.
func (s *session) getHcIndex() int {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.hcIndex
}

// getIdleList returns the session's link in its home session pool's idle list.
func (s *session) getIdleList() *list.Element {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.idleList
}

// getNextCheck returns the timestamp for next healthcheck on the session.
func (s *session) getNextCheck() time.Time {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.nextCheck
}

// shouldDropSession returns true if a particular error leads to the removal of a session
func shouldDropSession(err error) bool {
	if err == nil {
		return false
	}
	// If a Cloud Spanner can no longer locate the session (for example, if session is garbage collected), then caller
	// should not try to return the session back into the session pool.
	// TODO: once gRPC can return auxilary error information, stop parsing the error message.
	if ErrCode(err) == codes.NotFound && strings.Contains(ErrDesc(err), "Session not found:") {
		return true
	}
	return false
}
